package com.stay4it.rxjava;

import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

/**
 * RxJava错误处理（Error Handling）操作符——Catch/Retry
 * 本节参考文章：http://blog.csdn.net/jdsjlzx/article/details/52448173
 * 
 * catch系列操作符：onErrorReturn、onErrorResumeNext和onExceptionResumeNex
 * retry系列操作符：retry和retryWhen
 */
public class RxJava07 {
	
	private static Observable<String> createObserver() {
	    return Observable.create(new Observable.OnSubscribe<String>() {
	        @Override
	        public void call(Subscriber<? super String> subscriber) {
	            for (int i = 1; i <= 6; i++) {
	                if (i < 3) {
	                    subscriber.onNext(i+"");
	                } else {
	                    subscriber.onError(new Throwable("Throw error"));
	                }
	            }
	        }
	    });
	}
	
	private static Observable<String> createObserver2() {
	    return Observable.create(new Observable.OnSubscribe<String>() {
	        @Override
	        public void call(Subscriber<? super String> subscriber) {
	            for (int i = 1; i <= 6; i++) {
	                if (i < 3) {
	                    subscriber.onNext("onNext:" + i);
	                } else {
	                    subscriber.onError(new Exception("the nubmer is greater than 3"));
	                	//下面写法也是可以的
	                    /*try {
							throw new Exception("the nubmer is greater than 3");
							
						} catch (Exception e) {
							subscriber.onError(e);
						}*/
	                }
	            }
	        }
	    });
	}
	
	/**
	 * onErrorReturn
	 * 解释：让Observable遇到错误时发射一个特殊的项并且正常终止。 
	 */
	private static void test1(){
		createObserver()
		.onErrorReturn(new Func1<Throwable, String>() {

			@Override
			public String call(Throwable throwable) {
				
				return "do something";
			}
		})
		.subscribe(new Subscriber<String>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted");		
			}
			
			@Override
			public void onNext(String value) {
				System.out.println("onSuccess value = " + value);
			}
			
			@Override
			public void onError(Throwable error) {
				System.out.println("onError error = " + error);
			}
		});
		
	}
	
	/**
	 * onErrorResumeNext
	 * 解释：让Observable在遇到错误时开始发射第二个Observable的数据序列 
	 * 
	 * 当原Observable发射onError消息时，会忽略onError消息，不会传递给观察者；
	 * 然后它会开始另一个备用的Observable，继续发射数据。
	 */
	private static void test2(){
		createObserver()
		.onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {

			@Override
			public Observable<String> call(Throwable t) {
				return Observable.just("a","b","c");
			}
		})
		.subscribe(new Subscriber<String>() {
			
			@Override
			public void onCompleted() {
				System.out.println("onCompleted");		
			}
			
			@Override
			public void onNext(String value) {
				System.out.println("onSuccess value = " + value);
			}
			
			@Override
			public void onError(Throwable error) {
				System.out.println("onError error = " + error);
			}
		});
		
	}
	
	/**
	 * onExceptionResumeNext
	 * 解释：让Observable在遇到错误时开始发射第二个Observable的数据序列 
	 * 
	 * 和onErrorResumeNext类似，可以说是onErrorResumeNext的特例
	 * 区别是onErrorResumeNext操作符是当Observable发生错误或异常时触发，而onExceptionResumeNext是当Observable发生异常时才触发。
	 * 换句话说就是如果onError收到的Throwable不是一个Exception，它会将错误传递给观察者的onError方法，不会使用备用的Observable。
	 * 
	 * 关于Error和Exception的区别参考：http://blog.csdn.net/jdsjlzx/article/details/52448173
	 */
	private static void test3(){
		createObserver2()
		.onExceptionResumeNext(Observable.just("www.stay4it.com"))
		.subscribe(new Subscriber<String>() {
			
			@Override
			public void onCompleted() {
				System.out.println("onCompleted");		
			}
			
			@Override
			public void onNext(String value) {
				System.out.println("onSuccess value = " + value);
			}
			
			@Override
			public void onError(Throwable error) {
				System.out.println("onError error = " + error);
			}
		});
		
	}
	
	/**
	 * retry
	 * retry的意思就是试着重来，当原始Observable发射onError通知时，retry操作符不会把onError通知传递给观察者，
	 * 它会重新订阅这个Observable一次或者多次(意味着重新从头发射数据)，所以可能造成数据项重复发送的情况。
	 * 
	 * Javadoc: retry()：无论收到多少次onError通知，都会继续订阅并重发原始Observable，直到onCompleted（注意：会无限次尝试重新订阅）。
	 * Javadoc: retry(long)：接受count参数的retry会最多重新订阅count次，如果次数超过了就不会尝试再次订阅，它会把最新的一个onError通知传递给他的观察者。
	 * Javadoc: retry(Func2)： 这个版本的retry接受一个函数作为参数，这个函数的两个参数是：重试次数和导致发射onError通知的Throwable。这个函数返回一个布尔值，如果返回true，retry应该再次订阅和镜像原始的Observable，如果返回false，retry会将最新的一个onError通知传递给它的观察者。
	 * 
	 * test4()会无限次尝试重新订阅
	 */
	private static void test4(){
		createObserver()
		.retry(2)
		.subscribe(new Subscriber<String>() {
			
			@Override
			public void onCompleted() {
				System.out.println("onCompleted");		
			}
			
			@Override
			public void onNext(String value) {
				System.out.println("onSuccess value = " + value);
			}
			
			@Override
			public void onError(Throwable error) {
				System.out.println("onError error = " + error);
			}
		});
		
	}
	
	
	/**
	 * retry(Func2)
	 */
	private static void test5(){
		createObserver()
		.retry(new Func2<Integer, Throwable, Boolean>() {
			
			@Override
			public Boolean call(Integer t1, Throwable throwable) {
				System.out.println("发生错误了："+throwable.getMessage()+",第"+t1+"次重新订阅");	
				if(t1>2){
		            return false;//不再重新订阅
		        }
		        //此处也可以通过判断throwable来控制不同的错误不同处理
		        return true;
			}
		})
		.subscribe(new Subscriber<String>() {
			
			@Override
			public void onCompleted() {
				System.out.println("onCompleted");		
			}
			
			@Override
			public void onNext(String value) {
				System.out.println("onSuccess value = " + value);
			}
			
			@Override
			public void onError(Throwable error) {
				System.out.println("onError error = " + error);
			}
		});
		
	}
	

	/**
	 * retryWhen
	 * 解释：指示Observable遇到错误时，将错误传递给另一个Observable来决定是否要重新给订阅这个Observable
	 * 
	 * 应用场景：http://blog.csdn.net/jdsjlzx/article/details/51722365
	 * 
	 */
	static int retryCount = 0;
	private static void test6() throws InterruptedException{
		
		final int maxRetries = 3;

        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onError(new RuntimeException("always fails"));
            }
        })
        .subscribeOn(Schedulers.immediate())
        .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {

                    @Override
                    public Observable<?> call(Observable<? extends Throwable> observable) {
                        return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                            @Override
                            public Observable<?> call(Throwable throwable) {
                                if (++retryCount <= maxRetries) {
                                    // When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
                                    System.out.println("get error, it will try after " + 1000 + " millisecond, retry count " + retryCount);
                                    return Observable.timer(1000, TimeUnit.MILLISECONDS);
                                }
                                return Observable.error(throwable);
                            }
                        });
                    }


                })
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }

                    @Override
                    public void onNext(Integer value) {
                        System.out.println("onSuccess value = " + value);
                    }

                    @Override
                    public void onError(Throwable error) {
                        System.out.println("onError error = " + error);
                    }
                });
        Thread.sleep(4000); 
	}
	
	public static void main(String[] args) throws InterruptedException {
		test6();
		
	}
	
	
}
